/*-
 *   BSD LICENSE
 *
 *   Copyright (c) Intel Corporation.
 *   All rights reserved.
 *
 *   Redistribution and use in source and binary forms, with or without
 *   modification, are permitted provided that the following conditions
 *   are met:
 *
 *     * Redistributions of source code must retain the above copyright
 *       notice, this list of conditions and the following disclaimer.
 *     * Redistributions in binary form must reproduce the above copyright
 *       notice, this list of conditions and the following disclaimer in
 *       the documentation and/or other materials provided with the
 *       distribution.
 *     * Neither the name of Intel Corporation nor the names of its
 *       contributors may be used to endorse or promote products derived
 *       from this software without specific prior written permission.
 *
 *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */

#include <arpa/inet.h>
#include <fcntl.h>
#include <errno.h>
#include <infiniband/verbs.h>
#include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h>
#include <unistd.h>
#include <stdio.h>
#include <inttypes.h>
#include <stdint.h>

#include "nvmf_internal.h"
#include "request.h"
#include "session.h"
#include "subsystem.h"
#include "transport.h"
#include "nvmf_spec.h"

#include "core.h"
#include "list.h"
#include "sysy_lib.h"
#include "rdma_event.h"
#include "mem_hugepage.h"

/*
   RDMA Connection Resouce Defaults
   */
#define NVMF_DEFAULT_TX_SGE		1
#define NVMF_DEFAULT_RX_SGE		2


struct lich_nvmf_rdma_request {
        struct lich_nvmf_request		req;

        /* In Capsule data buffer */
        buffer_t				buf;

        union {
                struct ibv_recv_wr 		recv;
                struct ibv_send_wr		send;
        } wr;
        struct ibv_sge 				sg_list[2];

        struct list_head                        link;
};

struct lich_nvmf_rdma_conn {
        struct lich_nvmf_conn			conn;
        int                                      fd;
        struct rdma_cm_id			*cm_id;
        struct ibv_cq				*cq;

        /* The maximum number of I/O outstanding on this connection at one time */
        uint16_t				max_queue_depth;

        /* The negotiated number of I/O outstanding on this connection at one time */
        uint16_t				negotiated_queue_depth;

        /* The maximum number of active RDMA READ and WRITE operations at one time */
        uint16_t				max_rw_depth;

        /* The current number of I/O outstanding on this connection. This number
         * includes all I/O from the time the capsule is first received until it is
         * completed.
         */
        uint16_t				cur_queue_depth;

        /* The number of RDMA READ and WRITE requests that are outstanding */
        uint16_t				cur_rdma_rw_depth;

        /* Requests that are waiting to obtain a data buffer */
        struct list_head                        pending_data_buf_queue;

        /* Requests that are waiting to perform an RDMA READ or WRITE */
        struct list_head                        pending_rdma_rw_queue;

        /* Array of size "max_queue_depth" containing RDMA requests. */
        struct lich_nvmf_rdma_request		*reqs;

        /* Array of size "max_queue_depth" containing 64 byte capsules
         * used for receive.
         */
        union nvmf_h2c_msg			*cmds;
        struct ibv_mr                           *cmds_mr;
        buffer_t                                cmd_buf;
        /* Array of size "max_queue_depth" containing 16 byte completions
         * to be sent back to the user.
         */
        union nvmf_c2h_msg			*cpls;
        struct ibv_mr                           *cpls_mr;
        buffer_t                                cpl_buf;
        /* Array of size "max_queue_depth * InCapsuleDataSize" containing
         * buffers to be used for in capsule data.
         */
        void					*bufs;
        struct ibv_mr				*bufs_mr;

        struct ibv_mr				*mr;
        struct list_head                        link;
};

/* List of RDMA connections that have not yet received a CONNECT capsule */
//static struct list_head g_pending_conns = LIST_HEAD_INIT(g_pending_conns);

struct lich_nvmf_rdma_session {
        struct lich_nvmf_session		session;
        struct ibv_pd                           *pd;

        struct ibv_mr				*mr;
        struct ibv_context			*verbs;
};

struct lich_nvmf_rdma_listen_addr {
        struct in_addr            	        traddr;
        char					*trsvcid;
        struct rdma_cm_id			*id;
        struct ibv_device_attr 			attr;
        struct ibv_comp_channel			*comp_channel;

        struct list_head                        link;
};

struct lich_nvmf_rdma {
        struct rdma_event_channel	*event_channel;
        struct event_data               *tev;

        pthread_mutex_t 		lock;

        int                             ep_fd;
        uint16_t 			max_queue_depth;
        uint32_t 			max_io_size;
        uint32_t 			in_capsule_data_size;

        struct list_head                listen_addrs;
};

static struct lich_nvmf_rdma __g_rdma__ = {
        .lock = PTHREAD_MUTEX_INITIALIZER,
        .listen_addrs = LIST_HEAD_INIT(__g_rdma__.listen_addrs),
};

struct lich_rdma_pd_mr {
        struct list_head link;
        struct ibv_mr *mr;
        struct ibv_pd *pd;
        struct ibv_context *ibv_ctxt;
};

static LIST_HEAD(__g_ib_dev_list__);


int lich_get_rdma_capsule_data_size()
{
        return __g_rdma__.in_capsule_data_size;
}

static inline struct lich_nvmf_rdma_conn *get_rdma_conn(struct lich_nvmf_conn *conn)
{
        return (struct lich_nvmf_rdma_conn *)((uintptr_t)conn - offsetof(struct lich_nvmf_rdma_conn, conn));
}

static inline struct lich_nvmf_rdma_request *get_rdma_req(struct lich_nvmf_request *req)
{
        return (struct lich_nvmf_rdma_request *)((uintptr_t)req - offsetof(struct lich_nvmf_rdma_request,
                                req));
}

static inline struct lich_nvmf_rdma_session *get_rdma_sess(struct lich_nvmf_session *sess)
{
        return (struct lich_nvmf_rdma_session *)((uintptr_t)sess - offsetof(struct lich_nvmf_rdma_session,
                                session));
}

int lich_nvmf_rdma_poll(struct lich_nvmf_conn *conn);
static void __lich_nvmf_rdma_acceptor_poll(int fd, int type, int events, void *data, void *core);
static int nvmf_post_rdma_recv(struct lich_nvmf_request *req);
static void __lich_nvmf_rdma_poll(int fd, int type, int events, void *data, void *core);
static void * nvmf_init_rdma_buf_reg_mgr(void* pd, void* buf, size_t size);
void nvmf_rdma_buf_dereg_mgr(struct lich_nvmf_conn *conn);

static int ib_dev_find(struct lich_rdma_pd_mr **_dev, struct rdma_cm_id *cm_id)
{
        struct lich_rdma_pd_mr *dev;

        list_for_each_entry(dev, &__g_ib_dev_list__, link) {
                if (dev->ibv_ctxt == cm_id->verbs) {
                        *_dev = dev;
                        return 1;
                }
        }

        return 0;
}


static struct ibv_mr *rdma_reg_mr(struct ibv_pd * pd, void* buf, size_t size)
{
        int mr_flags = 0;
        struct ibv_mr *mr = NULL;

        mr_flags = IBV_ACCESS_LOCAL_WRITE
                | IBV_ACCESS_REMOTE_READ
                | IBV_ACCESS_REMOTE_WRITE;
        mr = ibv_reg_mr(pd, buf, size, mr_flags);
        if (unlikely(!mr)) {
                DERROR("ibv_reg_mr failed with mr_flags=0x%x, errno:%d, errmsg:%s\n",
                                mr_flags, errno, strerror(errno));
                goto err_ret;
        }

        return mr;
err_ret:
        return NULL;
}

static struct lich_rdma_pd_mr *get_ib_dev_mr(struct rdma_cm_id *id)
{
        int ret, dev_found;
        void *addr = NULL;
        size_t size = 0;
        struct lich_rdma_pd_mr *dev;

        dev_found = ib_dev_find(&dev, id);
        if (!dev_found) {

                DINFO("New IBv Device need register new mgr.\n");

                ret = ymalloc((void **)&dev, sizeof(*dev));
                if (unlikely(ret))
                        return NULL;

                dev->ibv_ctxt = id->verbs;
                dev->pd = ibv_alloc_pd(dev->ibv_ctxt);
                if (unlikely(!dev->pd)) {
                        DERROR("Failed to alloc IBv pd.\n");
                        return NULL;
                }

                get_global_public_mem(&addr, &size);
                dev->mr = rdma_reg_mr(dev->pd, addr, size);
                if (unlikely(!dev->mr)) {
                        DERROR("nvmf hugepge register mr fail.\n");
                        return NULL;
                }

                list_add_tail(&dev->link, &__g_ib_dev_list__);
        }

        return dev;
}

static void lich_nvmf_rdma_conn_destroy(struct lich_nvmf_rdma_conn *rdma_conn)
{
        int i;
        struct lich_nvmf_rdma_request	*rdma_req;

        /**if (rdma_conn->mr) {
          ibv_dereg_mr(rdma_conn->mr);
          }**/

        if (rdma_conn->cm_id) {
                rdma_destroy_qp(rdma_conn->cm_id);
                rdma_destroy_id(rdma_conn->cm_id);
        }

        if (rdma_conn->cq) {
                ibv_destroy_cq(rdma_conn->cq);
        }

        mbuffer_free(&rdma_conn->cmd_buf);
        mbuffer_free(&rdma_conn->cpl_buf);
        for (i = 0; i < rdma_conn->max_queue_depth; i++) {
                rdma_req = &rdma_conn->reqs[i];
                mbuffer_free(&rdma_req->buf);
        }

        /* Free all memory */
        free(rdma_conn->reqs);
        free(rdma_conn);
}

static struct lich_nvmf_rdma_conn *lich_nvmf_rdma_conn_create(struct rdma_cm_id *id, struct ibv_comp_channel *channel,
                uint16_t max_queue_depth, uint16_t max_rw_depth)
{
        struct lich_nvmf_rdma_conn	*rdma_conn;
        struct lich_nvmf_rdma_request	*rdma_req;
        struct lich_nvmf_conn		*conn;
        int				ret;
        struct ibv_qp_init_attr		attr;
        struct lich_rdma_pd_mr          *dev;

        rdma_conn = calloc(1, sizeof(struct lich_nvmf_rdma_conn));
        if (rdma_conn == NULL) {
                DERROR("Could not allocate new connection.\n");
                return NULL;
        }

        rdma_conn->max_queue_depth = 1;
        rdma_conn->negotiated_queue_depth = max_queue_depth;
        rdma_conn->max_rw_depth = max_rw_depth;
        INIT_LIST_HEAD(&rdma_conn->pending_rdma_rw_queue);

        dev = get_ib_dev_mr(id);
        if (dev == NULL) {
                DERROR("Could not get IBv mgr.\n");
                return NULL;
        }

        rdma_conn->cq = ibv_create_cq(dev->ibv_ctxt, max_queue_depth * 3, rdma_conn, channel, 0);
        if (!rdma_conn->cq) {
                DERROR("Completion Channel: %p Id: %p Verbs: %p\n", channel, id, id->verbs);
                DERROR("Unable to create completion queue, Errno[%d]: %s\n", errno, strerror(errno));
                rdma_destroy_id(id);
                lich_nvmf_rdma_conn_destroy(rdma_conn);
                return NULL;
        }

        ibv_req_notify_cq(rdma_conn->cq, 0);

        struct event_data *tev;
        struct epoll_event ev;
        ret = ymalloc((void **)&tev, sizeof(*tev));
        if (ret) {
                ret = ENOMEM;
                DERROR("Ymalloc failed!\n");
                return NULL;
        }

        tev->data = rdma_conn;
        tev->handler = __lich_nvmf_rdma_poll;
        tev->fd = channel->fd;

        memset(&ev, 0, sizeof(ev));
        ev.events = EPOLLIN;
        ev.data.ptr = tev;
        ret = epoll_ctl(__g_rdma__.ep_fd, EPOLL_CTL_ADD, channel->fd, &ev);
        if (ret) {
                DERROR("Nvmf epoll ctl add fd:%d failed\n", __g_rdma__.event_channel->fd);
                return NULL;
        }
        rdma_conn->fd = channel->fd;

        memset(&attr, 0, sizeof(struct ibv_qp_init_attr));
        attr.qp_type		= IBV_QPT_RC;
        attr.send_cq		= rdma_conn->cq;
        attr.recv_cq		= rdma_conn->cq;
        attr.cap.max_send_wr	= max_queue_depth; /* SEND, READ, and WRITE operations */
        attr.cap.max_recv_wr	= max_queue_depth; /* RECV operations */
        attr.cap.max_send_sge	= NVMF_DEFAULT_TX_SGE;
        attr.cap.max_recv_sge	= NVMF_DEFAULT_RX_SGE;

        ret = rdma_create_qp(id, dev->pd, &attr);
        if (ret) {
                DERROR("rdma_create_qp failed, Errno %d: %s\n", errno, strerror(errno));
                rdma_destroy_id(id);
                lich_nvmf_rdma_conn_destroy(rdma_conn);
                return NULL;
        }

        conn = &rdma_conn->conn;
        conn->transport = &lich_nvmf_transport_rdma;
        id->context = conn;
        rdma_conn->cm_id = id;

        DWARN("New RDMA Connection[%s]: %p, pd: %p, id->verbs: %p \n",
                        inet_ntoa(id->route.addr.dst_sin.sin_addr),
                        rdma_conn, dev->pd, dev->ibv_ctxt);

        rdma_conn->reqs = calloc(max_queue_depth, sizeof(*rdma_conn->reqs));

        mbuffer_init(&rdma_conn->cmd_buf, rdma_conn->max_queue_depth * sizeof(*rdma_conn->cmds));
        rdma_conn->cmds = ((seg_t *)rdma_conn->cmd_buf.list.next)->handler.ptr;

        mbuffer_init(&rdma_conn->cpl_buf, rdma_conn->max_queue_depth * sizeof(*rdma_conn->cpls));
        rdma_conn->cpls = ((seg_t *)rdma_conn->cpl_buf.list.next)->handler.ptr;

        rdma_conn->mr = dev->mr;

        int i;
        for (i = 0; i < rdma_conn->max_queue_depth; i++) {
                rdma_req = &rdma_conn->reqs[i];

                mbuffer_init(&rdma_req->buf, __g_rdma__.in_capsule_data_size);
                rdma_req->req.cmd = &rdma_conn->cmds[i];
                rdma_req->req.rsp = &rdma_conn->cpls[i];
                rdma_req->req.conn = &rdma_conn->conn;

                if (nvmf_post_rdma_recv(&rdma_req->req)) {
                        DERROR("Unable to post capsule for RDMA RECV\n");
                        lich_nvmf_rdma_conn_destroy(rdma_conn);
                        return NULL;
                }

                rdma_req->req.registered  = 1;
        }

        return rdma_conn;
}


static inline void nvmf_ibv_send_wr_init(struct ibv_send_wr *wr,
                struct lich_nvmf_request *req,
                struct ibv_sge *sg_list,
                enum ibv_wr_opcode opcode,
                int send_flags)
{
        struct lich_nvmf_rdma_request *rdma_req = get_rdma_req(req);
        YASSERT(wr != NULL);
        YASSERT(sg_list != NULL);

        memset(wr, 0, sizeof(*wr));

        wr->wr_id = (uint64_t)rdma_req;
        wr->opcode = opcode;
        wr->send_flags = send_flags;
        wr->sg_list = sg_list;
        wr->num_sge = 1;
}

static inline void nvmf_ibv_send_wr_set_rkey(struct ibv_send_wr *wr, struct lich_nvmf_request *req)
{
        struct spdk_nvme_sgl_descriptor *sgl = &req->cmd->nvme_cmd.dptr.sgl1;

        YASSERT(sgl->generic.type == SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK);

        wr->wr.rdma.rkey = sgl->keyed.key;
        wr->wr.rdma.remote_addr = sgl->address;

}

static int nvmf_post_rdma_read(struct lich_nvmf_request *req)
{
        struct ibv_send_wr	*bad_wr = NULL;
        struct lich_nvmf_conn 	*conn = req->conn;
        struct lich_nvmf_rdma_request 	*rdma_req = get_rdma_req(req);
        struct lich_nvmf_rdma_conn 	*rdma_conn = get_rdma_conn(conn);
        //  struct lich_nvmf_rdma_session 	*rdma_sess;
        int 			rc;

        rdma_req->sg_list[0].addr = (uintptr_t)((seg_t *)req->buf->list.next)->handler.ptr;
        rdma_req->sg_list[0].lkey = rdma_conn->mr->lkey;
        rdma_req->sg_list[0].length = req->length;

        nvmf_ibv_send_wr_init(&rdma_req->wr.send, req, rdma_req->sg_list, IBV_WR_RDMA_READ,
                        IBV_SEND_SIGNALED);
        nvmf_ibv_send_wr_set_rkey(&rdma_req->wr.send, req);

        rc = ibv_post_send(rdma_conn->cm_id->qp, &rdma_req->wr.send, &bad_wr);
        if (rc) {
                DERROR("Failure posting rdma read send, rc = 0x%x\n", rc);
        }

        return rc;
}

static int nvmf_post_rdma_write(struct lich_nvmf_request *req)
{
        struct ibv_send_wr	*bad_wr = NULL;
        struct lich_nvmf_conn 	*conn = req->conn;
        struct lich_nvmf_rdma_request 	*rdma_req = get_rdma_req(req);
        struct lich_nvmf_rdma_conn 	*rdma_conn = get_rdma_conn(conn);
        //struct lich_nvmf_rdma_session 	*rdma_sess;
        int 			ret;


        rdma_req->sg_list[0].addr = (uintptr_t)((seg_t *)req->buf->list.next)->handler.ptr;
        rdma_req->sg_list[0].lkey = rdma_conn->mr->lkey;
        //rdma_req->sg_list[0].lkey = ((seg_t *)req->buf->list.next)->handler.iser_lkey;
        rdma_req->sg_list[0].length = req->length;

        nvmf_ibv_send_wr_init(&rdma_req->wr.send, req, rdma_req->sg_list, IBV_WR_RDMA_WRITE,
                        IBV_SEND_SIGNALED);
        nvmf_ibv_send_wr_set_rkey(&rdma_req->wr.send, req);

        ret = ibv_post_send(rdma_conn->cm_id->qp, &rdma_req->wr.send, &bad_wr);
        if (ret) {
                DERROR("Failure posting rdma write send, rc = 0x%x\n", ret);
        }

        return ret;
}

static int nvmf_post_rdma_recv(struct lich_nvmf_request *req)
{
        struct ibv_recv_wr *bad_wr = NULL;
        struct lich_nvmf_conn *conn = req->conn;
        struct lich_nvmf_rdma_conn *rdma_conn = get_rdma_conn(conn);
        struct lich_nvmf_rdma_request *rdma_req = get_rdma_req(req);
        int ret;

        if (req->registered)
                return 0;

        rdma_req->sg_list[0].addr = (uintptr_t)req->cmd;
        rdma_req->sg_list[0].length = sizeof(*req->cmd);
        rdma_req->sg_list[0].lkey = rdma_conn->mr->lkey;
        //rdma_req->sg_list[0].lkey = ((seg_t *)rdma_conn->cmd_buf.list.next)->handler.iser_lkey;
        DBUG("the rdma_req cmd %p, lkey %u\n", req->cmd, rdma_req->sg_list[0].lkey);

        rdma_req->sg_list[1].addr = (uintptr_t)((seg_t *)rdma_req->buf.list.next)->handler.ptr;
        rdma_req->sg_list[1].length = __g_rdma__.in_capsule_data_size;
        rdma_req->sg_list[1].lkey = rdma_conn->mr->lkey;
        //rdma_req->sg_list[1].lkey = ((seg_t *)rdma_req->buf.list.next)->handler.iser_lkey;
        DBUG("the rdma_req buf %p, lkey %u\n", (void *)rdma_req->sg_list[1].addr, rdma_req->sg_list[1].lkey);

        memset(&rdma_req->wr.recv, 0, sizeof(struct ibv_recv_wr));
        rdma_req->wr.recv.wr_id = (uintptr_t)rdma_req;
        rdma_req->wr.recv.next = NULL;
        rdma_req->wr.recv.sg_list = rdma_req->sg_list;
        rdma_req->wr.recv.num_sge = 2;

        ret = ibv_post_recv(rdma_conn->cm_id->qp, &rdma_req->wr.recv, &bad_wr);
        if (ret) {
                DERROR("Failure posting rdma recv, rc = 0x%x\n", ret);
                YASSERT(0);
        }

        return ret;
}

static int nvmf_post_rdma_send(struct lich_nvmf_request *req)
{
        struct ibv_send_wr	*bad_wr = NULL;
        struct lich_nvmf_conn 	*conn = req->conn;
        struct lich_nvmf_rdma_request 	*rdma_req = get_rdma_req(req);
        struct lich_nvmf_rdma_conn 	*rdma_conn = get_rdma_conn(conn);
        int 			rc;


        rdma_req->sg_list[0].addr = (uintptr_t)req->rsp;
        rdma_req->sg_list[0].length = sizeof(*req->rsp);
        rdma_req->sg_list[0].lkey = rdma_conn->mr->lkey;
        //rdma_req->sg_list[0].lkey = ((seg_t *)rdma_conn->cpl_buf.list.next)->handler.iser_lkey;

        nvmf_ibv_send_wr_init(&rdma_req->wr.send, req, rdma_req->sg_list, IBV_WR_SEND, IBV_SEND_SIGNALED);

        rc = ibv_post_send(rdma_conn->cm_id->qp, &rdma_req->wr.send, &bad_wr);
        if (rc) {
                DERROR("Failure posting rdma send for NVMf completion, rc = 0x%x\n", rc);
        }

        return rc;
}

/**
 * REQUEST COMPLETION HANDLING
 *
 * Request completion consists of three steps:
 *
 * 1) Transfer any data to the host using an RDMA Write. If no data or an NVMe write,
 *    this step is unnecessary. (spdk_nvmf_rdma_request_transfer_data)
 * 2) Upon transfer completion, update sq_head, re-post the recv capsule,
 *    and send the completion. (spdk_nvmf_rdma_request_send_completion)
 * 3) Upon getting acknowledgement of the completion, decrement the internal
 *    count of number of outstanding requests. (spdk_nvmf_rdma_request_ack_completion)
 *
 * There are two public interfaces to initiate the process of completing a request,
 * exposed as callbacks in the transport layer.
 *
 * 1) spdk_nvmf_rdma_request_complete, which attempts to do all three steps.
 * 2) spdk_nvmf_rdma_request_release, which skips straight to step 3.
 **/

static int lich_nvmf_rdma_request_transfer_data(struct lich_nvmf_request *req)
{
        int ret;
        struct lich_nvmf_rdma_request *rdma_req = get_rdma_req(req);
        struct lich_nvmf_conn *conn = req->conn;
        struct lich_nvmf_rdma_conn *rdma_conn = get_rdma_conn(conn);

        YASSERT(req->xfer != SPDK_NVME_DATA_NONE);

        if (rdma_conn->cur_rdma_rw_depth < rdma_conn->max_rw_depth) {
                if (req->xfer == SPDK_NVME_DATA_CONTROLLER_TO_HOST) {
                        ret = nvmf_post_rdma_write(req);
                        if (ret) {
                                DERROR("Unable to transfer data from target to host\n");
                                return -1;
                        }
                } else if (req->xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER) {
                        ret = nvmf_post_rdma_read(req);
                        if (ret) {
                                DERROR("Unable to transfer data from host to target\n");
                                return -1;
                        }
                }

                rdma_conn->cur_rdma_rw_depth++;
        } else {
                list_add_tail(&rdma_req->link, &rdma_conn->pending_rdma_rw_queue);
        }

        return 0;
}

static int lich_nvmf_rdma_request_send_completion(struct lich_nvmf_request *req)
{
        int rc;
        struct lich_nvmf_conn		*conn = req->conn;
        struct spdk_nvme_cpl		*rsp = &req->rsp->nvme_cpl;
        // struct lich_nvmf_rdma_session	*rdma_sess;

        if (req->length > 0) {
                /* Put the buffer back in the pool */
                mbuffer_free(req->buf);
                req->length = 0;
        }

        /* Advance our sq_head pointer */
        if (conn->sq_head == conn->sq_head_max) {
                conn->sq_head = 0;
        } else {
                conn->sq_head++;
        }
        rsp->sqhd = conn->sq_head;

        /* Post the capsule to the recv buffer */
        rc = nvmf_post_rdma_recv(req);
        if (rc) {
                DERROR("Unable to re-post rx descriptor\n");
                return rc;
        }

        /* Send the completion */
        rc = nvmf_post_rdma_send(req);
        if (rc) {
                DERROR("Unable to send response capsule\n");
        }

        return rc;
}

static int lich_nvmf_rdma_request_ack_completion(struct lich_nvmf_request *req)
{
        struct lich_nvmf_conn *conn = req->conn;
        struct lich_nvmf_rdma_conn *rdma_conn = get_rdma_conn(conn);

        /* Advance our sq_head pointer */
        if (conn->sq_head == conn->sq_head_max) {
                conn->sq_head = 0;
        } else {
                conn->sq_head++;
        }

        rdma_conn->cur_queue_depth--;

        return 0;
}

static int nvmf_rdma_connect(struct rdma_cm_event *event)
{
        struct lich_nvmf_rdma_conn	*rdma_conn = NULL;
        struct rdma_conn_param		*rdma_param = NULL;
        struct rdma_conn_param		ctrlr_event_data;
        struct rdma_cm_id               *cm_id = event->id;
        struct lich_nvmf_rdma_listen_addr *addr = NULL;
        const struct spdk_nvmf_rdma_request_private_data *private_data = NULL;
        struct spdk_nvmf_rdma_accept_private_data accept_data;
        uint16_t			sts = 0;
        uint16_t			max_queue_depth;
        uint16_t			max_rw_depth;
        int 				rc;

        if (event->id == NULL) {
                DERROR("connect request: missing cm_id\n");
                goto err;
        }

        addr = calloc(1, sizeof(*addr));
        if (!addr) {
                goto err0;
        }

        rc = ibv_query_device(cm_id->verbs, &addr->attr);
        if (rc < 0) {
                DERROR("Failed to query RDMA device attributes.\n");
                goto err0;
        }

        addr->comp_channel = ibv_create_comp_channel(cm_id->verbs);
        if (!addr->comp_channel) {
                DERROR("Failed to create completion channel\n");
                goto err0;
        }

        rc = fcntl(addr->comp_channel->fd, F_SETFL, O_NONBLOCK);
        if (rc < 0) {
                DERROR("fcntl to set comp channel to non-blocking failed\n");
                goto err0;
        }

        if (event->id->verbs == NULL) {
                DERROR("connect request: missing cm_id ibv_context\n");
                goto err0;
        }

        DINFO("Connect Recv on fabric intf name %s, dev_name %s\n",
                        event->id->verbs->device->name, event->id->verbs->device->dev_name);

        //addr = event->listen_id->context;

        /* Figure out the supported queue depth. This is a multi-step process
         * that takes into account hardware maximums, host provided values,
         * and our target's internal memory limits */

        DBUG("Calculating Queue Depth\n");

        /* Start with the maximum queue depth allowed by the target */
        max_queue_depth = __g_rdma__.max_queue_depth;
        max_rw_depth = __g_rdma__.max_queue_depth;
        DINFO("Target Max Queue Depth: %d\n", __g_rdma__.max_queue_depth);

        /* Next check the local NIC's hardware limitations */
        DBUG("Local NIC Max Send/Recv Queue Depth: %d Max Read/Write Queue Depth: %d\n",
                        addr->attr.max_qp_wr, addr->attr.max_qp_rd_atom);
        max_queue_depth = nvmf_min(max_queue_depth, addr->attr.max_qp_wr);
        max_rw_depth = nvmf_min(max_rw_depth, addr->attr.max_qp_rd_atom);

        /* Next check the remote NIC's hardware limitations */
        rdma_param = &event->param.conn;
        DBUG("Host (Initiator) NIC Max Incoming RDMA R/W operations: %d Max Outgoing RDMA R/W operations: %d\n",
                        rdma_param->initiator_depth, rdma_param->responder_resources);
        if (rdma_param->initiator_depth > 0) {
                max_rw_depth = nvmf_min(max_rw_depth, rdma_param->initiator_depth);
        }

        /* Finally check for the host software requested values, which are
         * optional. */
        if (rdma_param->private_data != NULL &&
                        rdma_param->private_data_len >= sizeof(struct spdk_nvmf_rdma_request_private_data)) {
                private_data = rdma_param->private_data;
                DBUG("Host Receive Queue Size: %d\n", private_data->hrqsize);
                DBUG("Host Send Queue Size: %d\n", private_data->hsqsize);
                max_queue_depth = nvmf_min(max_queue_depth, private_data->hrqsize);
                max_queue_depth = nvmf_min(max_queue_depth, private_data->hsqsize);
        }

        DINFO("Final Negotiated Queue Depth: %d R/W Depth: %d\n",
                        max_queue_depth, max_rw_depth);

        /* Init the NVMf rdma transport connection */
        rdma_conn = lich_nvmf_rdma_conn_create(event->id, addr->comp_channel, max_queue_depth,
                        max_rw_depth);
        if (rdma_conn == NULL) {
                DERROR("Error on nvmf connection creation\n");
                goto err1;
        }

        accept_data.recfmt = 0;
        accept_data.crqsize = max_queue_depth;
        ctrlr_event_data = *rdma_param;
        ctrlr_event_data.private_data = &accept_data;
        ctrlr_event_data.private_data_len = sizeof(accept_data);
        if (event->id->ps == RDMA_PS_TCP) {
                ctrlr_event_data.responder_resources = -1; /* We accept 0 reads from the host */
                ctrlr_event_data.initiator_depth = max_rw_depth;
        }

        rc = rdma_accept(event->id, &ctrlr_event_data);
        if (rc) {
                DERROR("Error on rdma_accept\n");
                goto err2;
        }

        /* Add this RDMA connection to the global list until a CONNECT capsule
         * is received. */
        //list_add_tail(&rdma_conn->link, &g_pending_conns);

        return 0;

err2:
        lich_nvmf_rdma_conn_destroy(rdma_conn);

err1: {
              struct spdk_nvmf_rdma_reject_private_data rej_data;

              rej_data.status.sc = sts;
              rdma_reject(event->id, &ctrlr_event_data, sizeof(rej_data));
      }
err0:
      free(addr);
err:
      return -1;
}

static int nvmf_rdma_disconnect(struct rdma_cm_event *evt)
{
        struct lich_nvmf_conn		*conn;
        struct lich_nvmf_session	*session;
        struct lich_nvmf_subsystem	*subsystem;
        struct lich_nvmf_rdma_conn 	*rdma_conn;

        if (evt->id == NULL) {
                DERROR("disconnect request: missing cm_id\n");
                return -1;
        }

        conn = evt->id->context;
        if (conn == NULL) {
                DERROR("disconnect request: no active connection\n");
                return -1;
        }
        /* ack the disconnect event before rdma_destroy_id */
        rdma_ack_cm_event(evt);

        rdma_conn = get_rdma_conn(conn);

        session = conn->sess;
        if (session == NULL) {
                /* No session has been established yet. That means the conn
                 * must be in the pending connections list. Remove it. */
                //list_del(&rdma_conn->link);
                lich_nvmf_rdma_conn_destroy(rdma_conn);
                return 0;
        }

        subsystem = session->subsys;

        subsystem->disconnect_cb(subsystem, conn);

        return 0;
}

static const char *CM_EVENT_STR[] = {
        "RDMA_CM_EVENT_ADDR_RESOLVED",
        "RDMA_CM_EVENT_ADDR_ERROR",
        "RDMA_CM_EVENT_ROUTE_RESOLVED",
        "RDMA_CM_EVENT_ROUTE_ERROR",
        "RDMA_CM_EVENT_CONNECT_REQUEST",
        "RDMA_CM_EVENT_CONNECT_RESPONSE",
        "RDMA_CM_EVENT_CONNECT_ERROR",
        "RDMA_CM_EVENT_UNREACHABLE",
        "RDMA_CM_EVENT_REJECTED",
        "RDMA_CM_EVENT_ESTABLISHED",
        "RDMA_CM_EVENT_DISCONNECTED",
        "RDMA_CM_EVENT_DEVICE_REMOVAL",
        "RDMA_CM_EVENT_MULTICAST_JOIN",
        "RDMA_CM_EVENT_MULTICAST_ERROR",
        "RDMA_CM_EVENT_ADDR_CHANGE",
        "RDMA_CM_EVENT_TIMEWAIT_EXIT"
};

typedef enum _lich_nvmf_request_prep_type {
        LICH_NVMF_REQUEST_PREP_ERROR = -1,
        LICH_NVMF_REQUEST_PREP_READY = 0,
        LICH_NVMF_REQUEST_PREP_PENDING_BUFFER = 1,
        LICH_NVMF_REQUEST_PREP_PENDING_DATA = 2,
} lich_nvmf_request_prep_type;

static lich_nvmf_request_prep_type lich_nvmf_request_prep_data(struct lich_nvmf_request *req)
{
        struct spdk_nvme_cmd		*cmd = &req->cmd->nvme_cmd;
        struct spdk_nvme_cpl		*rsp = &req->rsp->nvme_cpl;
        struct lich_nvmf_rdma_request	*rdma_req = get_rdma_req(req);
        //  struct lich_nvmf_rdma_session	*rdma_sess;
        struct spdk_nvme_sgl_descriptor *sgl;
        req->length = 0;
        req->data = NULL;

        if (cmd->opc == SPDK_NVME_OPC_FABRIC) {
                req->xfer = spdk_nvme_opc_get_data_transfer(req->cmd->nvmf_cmd.fctype);
        } else {
                req->xfer = spdk_nvme_opc_get_data_transfer(cmd->opc);
        }

        if (req->xfer == SPDK_NVME_DATA_NONE) {
                return LICH_NVMF_REQUEST_PREP_READY;
        }

        sgl = &cmd->dptr.sgl1;

        if (sgl->generic.type == SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK &&
                        (sgl->keyed.subtype == SPDK_NVME_SGL_SUBTYPE_ADDRESS ||
                         sgl->keyed.subtype == SPDK_NVME_SGL_SUBTYPE_INVALIDATE_KEY)) {
                if (sgl->keyed.length > __g_rdma__.max_io_size) {
                        DERROR("SGL length 0x%x exceeds max io size 0x%x\n",
                                        sgl->keyed.length, __g_rdma__.max_io_size);
                        rsp->status.sc = SPDK_NVME_SC_DATA_SGL_LENGTH_INVALID;
                        return LICH_NVMF_REQUEST_PREP_ERROR;
                }

                if (sgl->keyed.length == 0) {
                        req->xfer = SPDK_NVME_DATA_NONE;
                        return LICH_NVMF_REQUEST_PREP_READY;
                }

                req->buf = &rdma_req->buf;
                req->length = sgl->keyed.length;

                /* TODO: In Capsule Data Size should be tracked per queue (admin, for instance, should always have 4k and no more). */
                if (sgl->keyed.length > __g_rdma__.in_capsule_data_size) {
                        mbuffer_init(&req->_buf, sgl->keyed.length);
                        req->buf = &req->_buf;
                } else {
                        /* Use the in capsule data buffer, even though this isn't in capsule data */
                        mbuffer_init(&req->_buf, 0);
                        mbuffer_reference(&req->_buf, &rdma_req->buf);
                        req->buf = &req->_buf;
                        req->buf->len = sgl->keyed.length;
                        ((seg_t *)req->buf->list.next)->len = req->length;
                }

                if (req->xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER) {
                        return LICH_NVMF_REQUEST_PREP_PENDING_DATA;
                } else {
                        return LICH_NVMF_REQUEST_PREP_READY;
                }
        } else if (sgl->generic.type == SPDK_NVME_SGL_TYPE_DATA_BLOCK &&
                        sgl->unkeyed.subtype == SPDK_NVME_SGL_SUBTYPE_OFFSET) {
                uint32_t max_len = __g_rdma__.in_capsule_data_size;

                uint64_t offset = sgl->address;

                if (offset > max_len) {
                        DERROR("In-capsule offset 0x%" PRIx64 " exceeds capsule length 0x%x\n",
                                        offset, max_len);
                        rsp->status.sc = SPDK_NVME_SC_INVALID_SGL_OFFSET;
                        return LICH_NVMF_REQUEST_PREP_ERROR;
                }
                max_len -= (uint32_t)offset;

                if (sgl->unkeyed.length > max_len) {
                        DERROR("In-capsule data length 0x%x exceeds capsule length 0x%x\n",
                                        sgl->unkeyed.length, max_len);
                        rsp->status.sc = SPDK_NVME_SC_DATA_SGL_LENGTH_INVALID;
                        return LICH_NVMF_REQUEST_PREP_ERROR;
                }

                if (sgl->unkeyed.length == 0) {
                        req->xfer = SPDK_NVME_DATA_NONE;
                        return LICH_NVMF_REQUEST_PREP_READY;
                }

                mbuffer_init(&req->_buf, 0);
                mbuffer_reference(&req->_buf, &rdma_req->buf);
                req->buf = &req->_buf;
                ((seg_t *)req->buf->list.next)->handler.ptr += offset;
                ((seg_t *)req->buf->list.next)->handler.phyaddr += offset;
                req->length = sgl->unkeyed.length;
                req->buf->len = req->length;
                ((seg_t *)req->buf->list.next)->len = req->length;

                YASSERT(req->length == req->buf->len);
                return LICH_NVMF_REQUEST_PREP_READY;
        }

        DERROR("Invalid NVMf I/O Command SGL:  Type 0x%x, Subtype 0x%x\n",
                        sgl->generic.type, sgl->generic.subtype);
        rsp->status.sc = SPDK_NVME_SC_SGL_DESCRIPTOR_TYPE_INVALID;
        return LICH_NVMF_REQUEST_PREP_ERROR;
}

static int lich_nvmf_rdma_handle_pending_rdma_rw(struct lich_nvmf_conn *conn)
{
        struct lich_nvmf_rdma_conn	*rdma_conn = get_rdma_conn(conn);
        // struct lich_nvmf_rdma_session	*rdma_sess;
        struct lich_nvmf_rdma_request	*rdma_req;
        int ret;
        int count = 0;

        /* Try to initiate RDMA Reads or Writes on requests that have data buffers */
        while (rdma_conn->cur_rdma_rw_depth < rdma_conn->max_rw_depth) {
                if (list_empty(&rdma_conn->pending_rdma_rw_queue)) {
                        break;
                }

                rdma_req = list_entry((&rdma_conn->pending_rdma_rw_queue)->next,
                                struct lich_nvmf_rdma_request, link);
                list_del(&rdma_req->link);


                ret = lich_nvmf_rdma_request_transfer_data(&rdma_req->req);
                if (ret) {
                        return -1;
                }
        }

        return count;
}

/* Public API callbacks begin here */

static int lich_nvmf_rdma_init(uint16_t max_queue_depth, uint32_t max_io_size,
                uint32_t in_capsule_data_size)
{
        int ret = 0;

        __g_rdma__.max_queue_depth = max_queue_depth;
        __g_rdma__.max_io_size = max_io_size;
        __g_rdma__.in_capsule_data_size = in_capsule_data_size;

        __g_rdma__.event_channel = rdma_create_event_channel();
        if (__g_rdma__.event_channel == NULL) {
                DERROR("rdma_create_event_channel() failed, %s\n", strerror(errno));
                GOTO(err_ret, ret);
        }

        ret = fcntl(__g_rdma__.event_channel->fd, F_SETFL, O_NONBLOCK);
        if (ret < 0) {
                DERROR("fcntl to set fd to non-blocking failed\n");
                GOTO(err_ret, ret);
        }

        __g_rdma__.ep_fd = epoll_create(16);
        if (__g_rdma__.ep_fd < 0) {
                ret = errno;
                GOTO(err_ret, ret);
        }


        return 0;

err_ret:
        return ret;
}

static int lich_nvmf_rdma_fini(void)
{
        struct lich_nvmf_rdma_listen_addr *addr;
        struct list_head *pos, *tmp;

        pthread_mutex_lock(&__g_rdma__.lock);
        list_for_each_safe(pos, tmp, &__g_rdma__.listen_addrs) {
                addr = list_entry(pos, struct lich_nvmf_rdma_listen_addr, link);
                list_del(pos);
                ibv_destroy_comp_channel(addr->comp_channel);
                rdma_destroy_id(addr->id);
        }

        if (__g_rdma__.event_channel != NULL) {
                rdma_destroy_event_channel(__g_rdma__.event_channel);
        }
        pthread_mutex_unlock(&__g_rdma__.lock);

        return 0;
}

static void __lich_nvmf_rdma_acceptor_poll(int fd, int type, int events, void *data, void *core)
{
        struct rdma_cm_event		*event;
        int				rc;

        (void)fd;
        (void)type;
        (void)events;
        (void)data;
        (void)core;

        if (__g_rdma__.event_channel == NULL) {
                DERROR("*** Nvmf Rdma Event Channel Create failed! ***\n");
                return;
        }

        /* Process pending connections for incoming capsules. The only capsule
         * this should ever find is a CONNECT request. */
        /**
          struct lich_nvmf_rdma_conn	*rdma_conn;
          struct list_head                *pos, *tmp;
          list_for_each_safe(pos, tmp, &g_pending_conns) {
          rdma_conn = list_entry(pos, struct lich_nvmf_rdma_conn, link);
          rc = lich_nvmf_rdma_poll(&rdma_conn->conn);
          if (rc < 0) {
          list_del(pos);
          lich_nvmf_rdma_conn_destroy(rdma_conn);
          } else if (rc > 0) {
         * At least one request was processed which is assumed to be
         * a CONNECT. Remove this connection from our list. *
         list_del(pos);
         }
         }
         **/

        while (1) {
                rc = rdma_get_cm_event(__g_rdma__.event_channel, &event);
                if (rc == 0) {
                        DBUG("*** Nvmf Acceptor Get Event: %s ***\n", CM_EVENT_STR[event->event]);
                        switch (event->event) {
                        case RDMA_CM_EVENT_CONNECT_REQUEST:
                                rc = nvmf_rdma_connect(event);
                                if (rc < 0) {
                                        DERROR("Unable to process connect event. rc: %d\n", rc);
                                        break;
                                }
                                break;
                        case RDMA_CM_EVENT_ESTABLISHED:
                                break;
                        case RDMA_CM_EVENT_ADDR_CHANGE:
                        case RDMA_CM_EVENT_DISCONNECTED:
                        case RDMA_CM_EVENT_DEVICE_REMOVAL:
                        case RDMA_CM_EVENT_TIMEWAIT_EXIT:
                                rc = nvmf_rdma_disconnect(event);
                                if (rc < 0) {
                                        DERROR("Unable to process disconnect event. rc: %d\n", rc);
                                        break;
                                }
                                continue;
                        default:
                                DERROR("Unexpected Acceptor Event [%d]\n", event->event);
                                break;
                        }

                        rdma_ack_cm_event(event);
                } else {
                        if (errno != EAGAIN && errno != EWOULDBLOCK) {
                                DERROR("Acceptor Event Error: %s\n", strerror(errno));
                        }
                        break;
                }
        }
}

static int lich_nvmf_rdma_listen(struct lich_nvmf_listen_addr *listen_addr)
{
        struct lich_nvmf_rdma_listen_addr *addr, *taddr;
        struct sockaddr_in saddr;
        int ret;
        struct list_head *pos, *tmp;

        pthread_mutex_lock(&__g_rdma__.lock);
        YASSERT(__g_rdma__.event_channel != NULL);

        addr = calloc(1, sizeof(*addr));
        if (!addr) {
                pthread_mutex_unlock(&__g_rdma__.lock);
                return -1;
        }

        addr->traddr = listen_addr->traddr;
        addr->trsvcid = listen_addr->trsvcid;

        ret = rdma_create_id(__g_rdma__.event_channel, &addr->id, NULL, RDMA_PS_TCP);
        if (ret < 0) {
                DERROR("rdma_create_id() failed\n");
                free(addr);
                pthread_mutex_unlock(&__g_rdma__.lock);
                return -1;
        }

        memset(&saddr, 0, sizeof(saddr));
        saddr.sin_family = AF_INET;
        saddr.sin_addr = addr->traddr;
        saddr.sin_port = htons((uint16_t)strtoul(addr->trsvcid, NULL, 10));
        ret = rdma_bind_addr(addr->id, (struct sockaddr *)&saddr);
        if (ret < 0) {
                DERROR("rdma_bind_addr() failed\n");
                rdma_destroy_id(addr->id);
                free(addr);
                pthread_mutex_unlock(&__g_rdma__.lock);
                return -1;
        }

        ret = rdma_listen(addr->id, 10); /* 10 = backlog */
        if (ret < 0) {
                DERROR("rdma_listen() failed\n");
                rdma_destroy_id(addr->id);
                free(addr);
                pthread_mutex_unlock(&__g_rdma__.lock);
                return -1;
        }

        if (!addr->id->verbs) {
                rdma_destroy_id(addr->id);
                free(addr);
                pthread_mutex_unlock(&__g_rdma__.lock);
                return -1;
        }

        list_for_each_safe(pos, tmp, &__g_rdma__.listen_addrs) {
                taddr = list_entry(pos, struct lich_nvmf_rdma_listen_addr, link);
                if ((!strcasecmp(taddr->trsvcid, listen_addr->trsvcid))) {
                        /* Already listening at this address */
                        rdma_destroy_id(addr->id);
                        free(addr);
                        pthread_mutex_unlock(&__g_rdma__.lock);
                        return 0;
                }
        }

        /**
        ret = ibv_query_device(addr->id->verbs, &addr->attr);
        if (ret < 0) {
                DERROR("Failed to query RDMA device attributes.\n");
                rdma_destroy_id(addr->id);
                free(addr);
                pthread_mutex_unlock(&__g_rdma__.lock);
                return -1;
        }

        addr->comp_channel = ibv_create_comp_channel(addr->id->verbs);
        if (!addr->comp_channel) {
                DERROR("Failed to create completion channel\n");
                rdma_destroy_id(addr->id);
                free(addr);
                pthread_mutex_unlock(&__g_rdma__.lock);
                return -1;
        }

        ret = fcntl(addr->comp_channel->fd, F_SETFL, O_NONBLOCK);
        if (ret < 0) {
                DERROR("fcntl to set comp channel to non-blocking failed\n");
                rdma_destroy_id(addr->id);
                ibv_destroy_comp_channel(addr->comp_channel);
                free(addr);
                pthread_mutex_unlock(&__g_rdma__.lock);
                return -1;
        }**/

        list_add_tail(&addr->link, &__g_rdma__.listen_addrs);
        pthread_mutex_unlock(&__g_rdma__.lock);

        DINFO("*** NVMf Target Listening on %s port %d ***\n",
                        inet_ntoa(addr->traddr), ntohs(rdma_get_src_port(addr->id)));

        return 0;
}

static void lich_nvmf_rdma_discover(struct lich_nvmf_listen_addr *listen_addr,
                struct spdk_nvmf_discovery_log_page_entry *entry)
{
        entry->trtype = SPDK_NVMF_TRTYPE_RDMA;
        entry->adrfam = SPDK_NVMF_ADRFAM_IPV4;
        entry->treq.secure_channel = SPDK_NVMF_TREQ_SECURE_CHANNEL_NOT_SPECIFIED;

        lich_strcpy_pad(entry->trsvcid, listen_addr->trsvcid, sizeof(entry->trsvcid), ' ');
        lich_strcpy_pad(entry->traddr, inet_ntoa(listen_addr->traddr), sizeof(entry->traddr), ' ');
        //lich_strcpy_pad(entry->traddr, "192.168.16.122", sizeof(entry->traddr), ' ');

        entry->tsas.rdma.rdma_qptype = SPDK_NVMF_RDMA_QPTYPE_RELIABLE_CONNECTED;
        entry->tsas.rdma.rdma_prtype = SPDK_NVMF_RDMA_PRTYPE_NONE;
        entry->tsas.rdma.rdma_cms = SPDK_NVMF_RDMA_CMS_RDMA_CM;
}

static struct lich_nvmf_session * lich_nvmf_rdma_session_init(void)
{
        struct lich_nvmf_rdma_session	*rdma_sess;

        rdma_sess = calloc(1, sizeof(*rdma_sess));
        if (!rdma_sess) {
                return NULL;
        }

        /* TODO: Make the number of elements in this pool configurable. For now, one full queue
         *       worth seems reasonable.
         */

        rdma_sess->pd = NULL;


        rdma_sess->session.transport = &lich_nvmf_transport_rdma;

        return &rdma_sess->session;
}

/**
  static void *nvmf_init_rdma_buf_dereg_mgr(void *arg1, void *arg2, size_t size)
  {
  (void)arg2;
  (void)size;
  ibv_dereg_mr((struct ibv_mr *)arg1);

  return NULL;
  }**/

static void lich_nvmf_rdma_session_fini(struct lich_nvmf_session *session)
{
        struct lich_nvmf_rdma_session *rdma_sess = get_rdma_sess(session);

        if (!rdma_sess) {
                return;
        }

        /**
          if (rdma_sess->mr)
          ibv_dereg_mr(rdma_sess->mr);
         **/

        free(rdma_sess);
}


static void * nvmf_init_rdma_buf_reg_mgr(void* pd, void* buf, size_t size)
{
        int mr_flags = 0;
        struct ibv_mr *mr = NULL;

        mr_flags = IBV_ACCESS_LOCAL_WRITE
                | IBV_ACCESS_REMOTE_READ
                | IBV_ACCESS_REMOTE_WRITE;
        mr = ibv_reg_mr((struct ibv_pd *)pd, buf,
                        size, mr_flags);
        if (!mr) {
                DERROR("ibv_reg_mr failed with mr_flags=0x%x, errno:%d, errmsg:%s\n",
                                mr_flags, errno, strerror(errno));
                goto err_ret;
        }

        return mr;
err_ret:
        return NULL;
}

static int lich_nvmf_rdma_session_add_conn(struct lich_nvmf_session *session,
                struct lich_nvmf_conn *conn, struct lich_nvmf_request **req)
{
        int ret, i;
        core_t *core = NULL;
        struct lich_nvmf_rdma_session	*rdma_sess = get_rdma_sess(session);
        struct lich_nvmf_rdma_conn	*rdma_conn = get_rdma_conn(conn);
        struct lich_nvmf_rdma_request	*rdma_req;
        struct rdma_cm_id *id = rdma_conn->cm_id;

        if (rdma_sess->verbs != NULL) {
                if (rdma_sess->verbs != rdma_conn->cm_id->verbs) {
                        DERROR("Two connections belonging to the same session cannot connect using different RDMA devices.\n");
                        return -1;
                }
        }

        rdma_sess->verbs = rdma_conn->cm_id->verbs;
        rdma_sess->pd = id->pd;

        core = core_self();
        if (core) {
                union nvmf_h2c_msg cmd = rdma_conn->cmds[0];
                union nvmf_c2h_msg rsp = rdma_conn->cpls[0];
                rdma_conn->max_queue_depth = rdma_conn->negotiated_queue_depth;

                /***  dereg and free  ***/
                nvmf_rdma_buf_dereg_mgr(conn);

                DINFO("__g_rdma__.max_queue_depth: %d, rdma_conn->max_queue_depth: %d, pd %p\n",
                                __g_rdma__.max_queue_depth, rdma_conn->max_queue_depth, rdma_sess->pd);

                mbuffer_init(&rdma_conn->cmd_buf, rdma_conn->max_queue_depth * sizeof(*rdma_conn->cmds));
                rdma_conn->cmds = ((seg_t *)rdma_conn->cmd_buf.list.next)->handler.ptr;

                mbuffer_init(&rdma_conn->cpl_buf, rdma_conn->max_queue_depth * sizeof(*rdma_conn->cpls));
                rdma_conn->cpls = ((seg_t *)rdma_conn->cpl_buf.list.next)->handler.ptr;

                rdma_conn->mr = rdma_reg_mr(id->pd, core->tls[VARIABLE_CORE], get_core_mempages_size());
                if (unlikely(!rdma_conn->mr)) {
                        DERROR("nvmf hugepge register mr fail.\n");
                        return -1;
                }

                for (i = 0; i < rdma_conn->max_queue_depth; i++) {
                        rdma_req = &rdma_conn->reqs[i];

                        mbuffer_init(&rdma_req->buf, __g_rdma__.in_capsule_data_size);
                        rdma_req->req.cmd = &rdma_conn->cmds[i];
                        rdma_req->req.rsp = &rdma_conn->cpls[i];
                        rdma_req->req.conn = &rdma_conn->conn;

                        if (nvmf_post_rdma_recv(&rdma_req->req)) {
                                DERROR("Unable to post capsule for RDMA RECV\n");
                                lich_nvmf_rdma_conn_destroy(rdma_conn);
                                return -1;
                        }

                        if (i == 0) {
                                *rdma_req->req.cmd = cmd;
                                *rdma_req->req.rsp = rsp;
                                rdma_req->req.registered = 0;
                        }
                }

                ret = epoll_ctl(__g_rdma__.ep_fd, EPOLL_CTL_DEL, rdma_conn->fd, NULL);
                if (ret) {
                        DERROR("Nvmf epoll ctl add fd:%d failed\n", __g_rdma__.event_channel->fd);
                        return -1;
                }

                //yfree((void **)__g_rdma__.tev); todo. invalid free.
        } else {
                (void)req;
                rdma_req = &rdma_conn->reqs[0];
                rdma_req->req.registered = 0;
        }

        return 0;
}

static int lich_nvmf_rdma_session_remove_conn(struct lich_nvmf_session *session,
                struct lich_nvmf_conn *conn)
{
        (void)session;
        (void)conn;

        return 0;
}

static int lich_nvmf_rdma_request_complete(struct lich_nvmf_request *req)
{
        struct spdk_nvme_cpl *rsp = &req->rsp->nvme_cpl;
        int ret;

        if (rsp->status.sc == SPDK_NVME_SC_SUCCESS &&
                        req->xfer == SPDK_NVME_DATA_CONTROLLER_TO_HOST) {
                ret = lich_nvmf_rdma_request_transfer_data(req);
        } else {
                ret = lich_nvmf_rdma_request_send_completion(req);
        }

        return ret;
}

static int lich_nvmf_rdma_request_release(struct lich_nvmf_request *req)
{
        return lich_nvmf_rdma_request_ack_completion(req);
}

static void lich_nvmf_rdma_close_conn(struct lich_nvmf_conn *conn)
{
        struct lich_nvmf_rdma_conn *rdma_conn = get_rdma_conn(conn);

        return lich_nvmf_rdma_conn_destroy(rdma_conn);
}

/* Returns the number of times that spdk_nvmf_request_exec was called,
 * or -1 on error.
 */
static void __lich_nvmf_rdma_poll(int fd, int type, int events, void *data, void *core)
{
        struct lich_nvmf_rdma_conn *rdma_conn = data;

        (void)fd;
        (void)type;
        (void)events;
        (void)core;

        lich_nvmf_rdma_poll(&rdma_conn->conn);
}

int lich_nvmf_rdma_poll(struct lich_nvmf_conn *conn)
{
        struct ibv_wc wc[32];
        struct lich_nvmf_rdma_conn *rdma_conn = get_rdma_conn(conn);
        struct lich_nvmf_rdma_request *rdma_req;
        struct lich_nvmf_request *req;
        int reaped, i, rc;
        int count = 0;

        /* Poll for completing operations. */
        rc = ibv_poll_cq(rdma_conn->cq, 32, wc);
        if (rc < 0) {
                DERROR("Error polling CQ! (%d): %s\n",
                                errno, strerror(errno));
                return -1;
        }

        reaped = rc;
        for (i = 0; i < reaped; i++) {

                if (wc[i].status) {
                        DERROR("CQ error on Connection %p, Request 0x%lu (%d): %s\n",
                                        conn, wc[i].wr_id, wc[i].status, ibv_wc_status_str(wc[i].status));
                        return -1;
                }

                rdma_req = (struct lich_nvmf_rdma_request *)wc[i].wr_id;
                if (rdma_req == NULL) {
                        DERROR("NULL wr_id in RDMA work completion\n");
                        return -1;
                }

                req = &rdma_req->req;
                switch (wc[i].opcode) {
                case IBV_WC_SEND:
                        YASSERT(rdma_conn->cur_queue_depth > 0);
                        rc = lich_nvmf_rdma_request_ack_completion(req);
                        if (rc) {
                                return -1;
                        }
                        break;

                case IBV_WC_RDMA_WRITE:
                        rc = lich_nvmf_rdma_request_send_completion(req);
                        if (rc) {
                                return -1;
                        }

                        /* Since an RDMA R/W operation completed, try to submit from the pending list. */
                        rdma_conn->cur_rdma_rw_depth--;
                        rc = lich_nvmf_rdma_handle_pending_rdma_rw(conn);
                        if (rc < 0) {
                                return -1;
                        }
                        count += rc;
                        break;

                case IBV_WC_RDMA_READ:
                        rc = lich_nvmf_request_exec(req);
                        if (rc) {
                                return -1;
                        }
                        count++;

                        /* Since an RDMA R/W operation completed, try to submit from the pending list. */
                        rdma_conn->cur_rdma_rw_depth--;
                        rc = lich_nvmf_rdma_handle_pending_rdma_rw(conn);
                        if (rc < 0) {
                                return -1;
                        }
                        count += rc;
                        break;

                case IBV_WC_RECV:
                        if (wc[i].byte_len < sizeof(struct spdk_nvmf_capsule_cmd)) {
                                DERROR("recv length %u less than capsule header\n", wc[i].byte_len);
                                return -1;
                        }

                        rdma_conn->cur_queue_depth++;

                        memset(req->rsp, 0, sizeof(*req->rsp));
                        rc = lich_nvmf_request_prep_data(req);
                        switch (rc) {
                        case LICH_NVMF_REQUEST_PREP_READY:
                                /* Data is immediately available */
                                rc = lich_nvmf_request_exec(req);
                                if (rc < 0) {
                                        return -1;
                                }
                                count++;
                                break;
                        case LICH_NVMF_REQUEST_PREP_PENDING_DATA:
                                rc = lich_nvmf_rdma_request_transfer_data(req);
                                if (rc < 0) {
                                        return -1;
                                }
                                break;
                        case LICH_NVMF_REQUEST_PREP_ERROR:
                                lich_nvmf_rdma_request_complete(req);
                                break;
                        }
                        break;

                default:
                        DERROR("Received an unknown opcode on the CQ: %d\n", wc[i].opcode);
                        return -1;
                }
        }

        return count;
}

static void __lich_nvmf_acceptor_poll(void)
{
        int nevent, i;
        struct epoll_event events[16];
        struct event_data *tev;

        while(1) {
                nevent = epoll_wait(__g_rdma__.ep_fd, events, 16, 200);
                if (nevent < 0){
                        if (errno != EINTR) {
                                DBUG("file recv loop interrupted by signal\n");
                                continue;
                        }
                } else {
                        for (i = 0; i < nevent; i++) {
                                tev = (struct event_data *) events[i].data.ptr;
                                tev->handler(tev->fd, 0, events[i].events, tev->data, NULL);
                        }
                }
        }
}

static int lich_nvmf_epoll_add(void)
{
        int ret;
        struct epoll_event ev;
        struct event_data *tev;

        ret = ymalloc((void **)&tev, sizeof(*tev));
        if (ret) {
                ret = ENOMEM;
                GOTO(err_ret, ret);
        }

        tev->data = NULL;
        tev->handler = __lich_nvmf_rdma_acceptor_poll;
        tev->fd = __g_rdma__.event_channel->fd ;

        memset(&ev, 0, sizeof(ev));
        ev.events = EPOLLIN;
        ev.data.ptr = tev;
        __g_rdma__.tev = tev;

        ret = epoll_ctl(__g_rdma__.ep_fd, EPOLL_CTL_ADD, __g_rdma__.event_channel->fd, &ev);
        if (ret) {
                DERROR("Nvmf epoll ctl add fd:%d failed\n", __g_rdma__.event_channel->fd);
                GOTO(err_ret, ret);
        }

        return 0;

err_ret:
        return ret;
}

void nvmf_rdma_buf_dereg_mgr(struct lich_nvmf_conn *conn)
{
        int i;
        struct lich_nvmf_rdma_conn *rdma_conn = get_rdma_conn(conn);
        struct lich_nvmf_rdma_request *rdma_req = &rdma_conn->reqs[0];

        mbuffer_free(&rdma_conn->cmd_buf);
        mbuffer_free(&rdma_conn->cpl_buf);
        for (i = 0; i < rdma_conn->max_queue_depth; i++) {
                rdma_req = &rdma_conn->reqs[i];
                mbuffer_free(&rdma_req->buf);
        }
}

const struct lich_nvmf_transport lich_nvmf_transport_rdma = {
        .name = "rdma",
        .transport_init = lich_nvmf_rdma_init,
        .transport_fini = lich_nvmf_rdma_fini,
        .transport_ep_add = lich_nvmf_epoll_add,
        .acceptor_poll = __lich_nvmf_acceptor_poll,

        .listen_addr_add = lich_nvmf_rdma_listen,
        .listen_addr_discover = lich_nvmf_rdma_discover,

        .session_init = lich_nvmf_rdma_session_init,
        .session_fini = lich_nvmf_rdma_session_fini,
        .session_add_conn = lich_nvmf_rdma_session_add_conn,
        .session_remove_conn = lich_nvmf_rdma_session_remove_conn,
        .req_complete = lich_nvmf_rdma_request_complete,
        .req_release = lich_nvmf_rdma_request_release,

        .conn_fini = lich_nvmf_rdma_close_conn,
        .conn_poll = lich_nvmf_rdma_poll,
};

